56. JUC

JUC

JUC 全称是 java.util.concurrent,作用是在并发编程中使用的工具类

进程的状态

NEW,(新建)
RUNNABLE,(准备就绪)
BLOCKED,(阻塞)
WAITING,(不见不散)
TIMED_WAITING,(过时不候)
TERMINATED, (终结)

wait/sleep 的区别

功能都是当前线程暂停,区别在于 wait 是放开手去睡,放开手里的锁。sleep 是握紧手去睡,醒了手里还有锁。

并发/并行

并发:同一时刻多个线程在访问同一个资源,多个线程对一个点。例子,小米9今天上午10点,限量抢购,春运抢票,电商秒杀…
并行:多项工作一起执行,之后再汇总。例子,泡方便面,电水壶烧水,一边撕调料倒入桶中

Lock

Lock 和 Synchronized 的区别:Lock 实现提供了比使用 Synchronized 方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。而且 Lock 是手动上锁和解锁,Synchronized 则控制不了。

ReentrantLock

可重入锁,Lock 接口的实现,代码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.atguigu.juc;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Ticket {
private int number = 30;
private Lock lock = new ReentrantLock();

public void sale() {
// 创建锁
lock.lock();
try {
// 买票操作
if(number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出:" + number-- + "号票,还剩:" + number + "张");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
}

public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
// 售票员 AA, lambda 表达式
new Thread(()-> {for (int i = 0; i < 40; i++) ticket.sale();}, "AA").start();
// 售票员 BB, lambda 表达式
new Thread(()-> {for (int i = 0; i < 40; i++) ticket.sale();}, "BB").start();
// 售票员 CC, lambda 表达式
new Thread(()-> {for (int i = 0; i < 40; i++) ticket.sale();}, "CC").start();

// new Thread(new Runnable() {
// @Override
// public void run() {
// for (int i = 0; i < 40; i++) {
// ticket.sale();
// }
// }
// }, "AA").start();
//
// // 售票员 BB
// new Thread(new Runnable() {
// @Override
// public void run() {
// for (int i = 0; i < 40; i++) {
// ticket.sale();
// }
// }
// }, "BB").start();
//
// // 售票员 CC
// new Thread(new Runnable() {
// @Override
// public void run() {
// for (int i = 0; i < 40; i++) {
// ticket.sale();
// }
// }
// }, "CC").start();
}
}

java8特性

函数式接口

在 java.util.function 包下面的都是函数式接口。函数式接口都有 @FunctionalInterface 注解。

内置四大函数式接口

Consumer<T> : 消费型接口,有参无返回值。范型 T 是参数类型
Supplier<T> : 供给型接口,无参有返回值。范型 T 是返回值类型
Function<T, R> : 函数型接口,有参有返回值。范型 T 是参数类型,R 是返回值类型
Predicate<T> : 断定型接口,有参,返回值为 bool。范型 T 是参数类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.atguigu.juc;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class FunctionalInterfaceDemo {
public static void main(String[] args) {
// void accept(T t);
Consumer<String> consumer = t-> {
System.out.println(t);
};
consumer.accept("ABC"); // ABC


// T get();
Supplier<String> supplier = () -> {
return "DEF";
};
System.out.println(supplier.get()); // DEF


// R apply(T t);
Function<String, Integer> function = (t) ->{
return t.length();
};
System.out.println(function.apply("zhangsan")); // 8


// boolean test(T t);
Predicate<String> predicate = (t) -> {
return t.length() > 10?true:false;
};
System.out.println(predicate.test("zhangsan")); // false
}
}

Stream 流

Stream 流是数据渠道,用于操作数据源(集合,数组等)。“集合注重的是数据,流注重的是计算“。

特点:

  1. Stream 自己不会存储元素
  2. Stream 不会改变源对象,相反,会返回一个新的持有结果的 Stream 流
  3. Stream 的操作是延迟的,这意味着他们会等到需要结果的时候才执行

例子:

按照给出的数据,找出偶数ID,年龄大于 24,用户名转为大写,用户名字幕倒序,只输出一个用于名字。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.atguigu.juc;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

class User {
private int id;
private String userName;
private int age;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public User(int id, String userName, int age) {
this.id = id;
this.userName = userName;
this.age = age;
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", userName='" + userName + '\'' +
", age=" + age +
'}';
}
}

/**
* 请按照给出数据,找出
* 偶数ID
* 年龄大于24
* 用户名转为大写
* 用户名字母倒排序
* 只输出一个
* 用户名字
*/
public class StreamDemo {
public static void main(String[] args) {
User u1 = new User(11, "a", 23);
User u2 = new User(12, "b", 24);
User u3 = new User(13, "c", 22);
User u4 = new User(14, "d", 28);
User u5 = new User(16, "e", 26);

List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
list.stream().filter((t) -> {
return t.getId() % 2 == 0; // 筛选出 偶数ID
}).filter((t) -> {
return t.getAge() > 24; // 筛选出 年龄大于24
}).map((t) -> {
return t.getUserName().toUpperCase(); // 用户名转为大写
}).sorted((o1, o2) -> {
return o2.compareTo(o1); // 用户名字母倒排序
}).limit(1).forEach(System.out::println); // E
}
}

lambda 表达式

Lambda 是一个匿名函数,我们可以把 Lambda 表达式理解为是一段可以传递的代码(将代码像数据一样进行传递)。可以写出更简洁、更灵活的代码。作为一种更紧凑的代码风格,使Java 的语言表达能力得到了提升。

Lambda 表达式在 Java 语言中引入了一个新的语法元素和操作符。这个操作符为 “->” , 该操作符被称为 Lambda 操作符或剪头操作符。它将 Lambda 分为两个部分,左侧:指定了 Lambda 表达式需要的所有参数,右侧:指定了 Lambda 体,即 Lambda 表达式要执行的功能。

使用 Lambda 表达式的条件是 lambda 表达式,必须是函数式接口,即必须只有一个方法,如果接口只有一个方法 java 默认它为函数式接口。为了正确使用Lambda表达式,需要给接口加个注解:@FunctionalInterface,如有两个方法,立刻报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.atguigu.juc;

public class LambdaDemo {
@FunctionalInterface // 函数式接口(只能有一个实现的方法)
interface Foo{
public int add(int x, int y);
}

public static void main(String[] args) {
// 实现方式1: 内部类实现
Foo foo = new Foo() {
@Override
public int add(int x, int y) {
return x + y;
}
};
System.out.println(foo.add(10, 5));

// 实现方式2: lambda 表达式实现(使用 lambda 的条件是必须是函数式接口,即只有一个实现的方法)
Foo foo1 = (int x, int y) -> {
return x + y;
};
System.out.println(foo1.add(10, 5));
}
}

接口里是否能有实现方法

代码地址

在 Java 8 中接口新增了 default 方法和静态方法。即,一个接口中可以有多个 default 方法,也可以有多个静态当法。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.atguigu.juc;

public class LambdaDemo {
@FunctionalInterface // 函数式接口(只能有一个实现的方法)
interface Foo{
public int add(int x, int y);

// default 方法
default int sub(int x, int y){
return x - y;
}
default int div(int x, int y){
return x / y;
}

// 静态方法
public static int aa(int x, int y){
return x + y + 100;
}
public static int bb(int x, int y){
return x - y - 100;
}
}

public static void main(String[] args) {
// 实现方式1: 内部类实现
Foo foo = new Foo() {
@Override
public int add(int x, int y) {
return x + y;
}
};
System.out.println(foo.add(10, 5));

// 实现方式2: lambda 表达式实现(使用 lambda 的条件是必须是函数式接口,即只有一个实现的方法)
Foo foo1 = (int x, int y) -> {
return x + y;
};
System.out.println(foo1.add(10, 5));
System.out.println(foo1.div(10, 5));
System.out.println(foo1.sub(10, 5));
}
}

线程间通信

需求: 四个线程 ABCD。两个线程进行 + 1。另外个线程进行 -1。要求 +1 和 -1 交替进行。每个线程执行 10 次

synchronized 实现

代码地址

注意⚠️:在使用 object 中的 wait 方法的时候,一定要为 wait 使用 while 循环。if 判断只会判断一次,如果使用 if 当线程再次被唤醒的时候,因为执行过判断了,就有可能绕过条件(以前判断过了,认为没问题,但是其实已经不满足条件了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.atguigu.juc;

/**
* 四个线程 ABCD。两个线程进行 + 1。另外个线程进行 -1
* 要求 +1 和 -1 交替进行。每个线程执行 10 次
*/
public class NotifyWaitDemo {
private int number = 0;//初始值为零的一个变量

public synchronized void increment() throws InterruptedException {
// 1判断
// 这里使用 while 判断而不使用 if 的原因是因为 if 只判断一次
// 如果使用 if 判断,线程被唤醒的时候不在进行判断,那么就有可能会绕过条件
// 使用 while 的目的就是让线程每次被唤醒的时候,每次都进行判断
while (number !=0 ) {
this.wait();
}
//2干活
number++;
System.out.println(Thread.currentThread().getName()+"\t"+number);
//3通知
this.notifyAll();
}

public synchronized void decrement() throws InterruptedException {
// 1判断
// 这里使用 while 判断而不使用 if 的原因是因为 if 只判断一次
// 如果使用 if 判断,线程被唤醒的时候不在进行判断,那么就有可能会绕过条件
// 使用 while 的目的就是让线程每次被唤醒的时候,每次都进行判断
while (number == 0) {
this.wait();
}
// 2干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3通知
this.notifyAll();
}

public static void main(String[] args) {
NotifyWaitDemo notifyWaitDemo = new NotifyWaitDemo();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
notifyWaitDemo.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "线程A").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
notifyWaitDemo.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "线程B").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
notifyWaitDemo.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "线程C").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
notifyWaitDemo.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "线程D").start();
}
}
1
2
3
4
5
6
7
线程B	0
线程A 1
线程B 0
线程A 1
线程B 0
......
线程D 0

上述代码中,如果不使用 while ,那么就会出现值大于1或者小于 0 的情况。

lock 实现

代码地址

与 synchronized 相比,lock 中实现释放当前线程资源的方法是 Condition 中的 await 方法。唤醒其他线程的方法是 Condition 中的 signal 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.atguigu.juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 四个线程 ABCD。两个线程进行 + 1。另外个线程进行 -1
* 要求 +1 和 -1 交替进行。每个线程执行 10 次
*/
public class LockSingalAwait {
private int number = 0;//初始值为零的一个变量
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void increment() throws InterruptedException {
lock.lock();
try {
while (number !=0 ) {
condition.await();
}
//2干活
number++;
System.out.println(Thread.currentThread().getName()+"\t"+number);
//3通知
condition.signalAll();
} finally {
lock.unlock();
}
}

public void decrement() throws InterruptedException {
lock.lock();
try {
while (number !=1 ) {
condition.await();
}
//2干活
number--;
System.out.println(Thread.currentThread().getName()+"\t"+number);
//3通知
condition.signalAll();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
LockSingalAwait syncNotifyWait = new LockSingalAwait();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
syncNotifyWait.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "线程A").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
syncNotifyWait.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "线程B").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
syncNotifyWait.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "线程C").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
syncNotifyWait.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "线程D").start();
}
}

线程间定制化通信

代码地址

需求:三个线程 AA,BB,CC,现要求,AA BB CC 按顺序执行,且 AA 打印5次,BB 打印 10次,CC 打印 15 次,然后循环10次。

在 lock 中我们可以唤醒指定的线程,只需要使用 signal 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.atguigu.juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 需求:三个线程 AA,BB,CC,现要求,AA BB CC 按顺序执行
* 且 AA 打印5次,BB 打印 10次,CC 打印 15 次,然后循环10次。
*/
public class LockSingalAwait2 {
// 线程标志,1 表示该 AA 执行,2 BB 执行,3 CC 执行
private int number = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition(); // AA 线程的 Condition
private Condition condition2 = lock.newCondition(); // BB 线程的 Condition
private Condition condition3 = lock.newCondition(); // CC 线程的 Condition

public void print5(int loopCount) throws InterruptedException {
lock.lock();
try {
// 判断 (不等于1的时候,需要等待)
while (number!=1){
condition1.await();
}
// 干活
for (int i = 1; i <= 5; i++) {
System.out.println("线程 :" + Thread.currentThread().getName() + " 打印:" + i + "次,当前第:" + loopCount + "轮");
}

// 通知 (修改标识位,并唤醒线程 BB)
number = 2;
condition2.signal();
} finally {
// 不管怎么样都要释放锁
lock.unlock();
}
}

public void print10(int loopCount) throws InterruptedException {
lock.lock();
try {
// 判断 (不等于2的时候,需要等待)
while (number!=2){
condition2.await();
}
// 干活
for (int i = 1; i <= 10; i++) {
System.out.println("线程 :" + Thread.currentThread().getName() + " 打印:" + i + "次,当前第:" + loopCount + "轮");
}

// 通知 (修改标识位,并唤醒线程 CC)
number = 3;
condition3.signal();
} finally {
// 不管怎么样都要释放锁
lock.unlock();
}
}

public void print15(int loopCount) throws InterruptedException {
lock.lock();
try {
// 判断 (不等于2的时候,需要等待)
while (number!=3){
condition3.await();
}
// 干活
for (int i = 1; i <= 15; i++) {
System.out.println("线程 :" + Thread.currentThread().getName() + " 打印:" + i + "次,当前第:" + loopCount + "轮");
}

// 通知 (修改标识位,并唤醒线程 AA【此处又回到 AA】)
number = 1;
condition1.signal();
} finally {
// 不管怎么样都要释放锁
lock.unlock();
}
}

public static void main(String[] args) {
LockSingalAwait2 lockSingalAwait2 = new LockSingalAwait2();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
lockSingalAwait2.print5(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "AA").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
lockSingalAwait2.print10(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "BB").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
lockSingalAwait2.print15(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "CC").start();
}
}

上面代码中 AA 执行完成之后唤醒 BB 执行,BB 执行之后唤醒 CC,最后 CC 又唤醒 AA,直到最后循环 10 次。

线程不安全

Vector、HashTable、Properties 是线程安全的;

ArrayList、LinkedList、HashSet、TreeSet、HashMap、TreeMap 等都是线程不安全的。

List 线程安全

ArrayList 是线程不安全的,我们可以使用 CopyOnWriteArrayList。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.atguigu.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class ArrayListDemo {
public static void main(String[] args) {
List<String> list = new ArrayList<>();

for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 会出现并发修改异常 java.util.ConcurrentModificationException
list.add(UUID.randomUUID().toString());
System.out.println(list);
}
}, String.valueOf(i)).start();
}
}
}
1
2
3
4
5
6
7
8
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at java.util.AbstractCollection.toString(AbstractCollection.java:461)
at java.lang.String.valueOf(String.java:2994)
at java.io.PrintStream.println(PrintStream.java:821)
at com.atguigu.juc.ArrayListDemo$1.run(ArrayListDemo.java:16)
at java.lang.Thread.run(Thread.java:748)

解决方法有三种,最好的是第三种,前两种都不推荐使用。代码地址

Vector

解决方法1:Vector。Vector 也是 List 的实现类,但是查看 Vector 的源码发现,Vector 的 add 方法上有 synchronized,所以 Vector 是线程安全的。但是实际使用过程中,并不推荐使用 Vector 因为很老了,而且速度很慢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static void noSafeList() {
// List<String> list = new ArrayList<>(); // 线程不安全
List<String> list = new Vector<>(); // 线程安全,但是效率低下

for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}
}, String.valueOf(i)).start();
}
}
Collections

解决方法2: Collections。 Collections 工具类提供了一些 synchronized方法,例如 synchronizedList,synchronizedMap, synchronizedSet 之类的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static void noSafeList() {
// List<String> list = new ArrayList<>(); // 线程不安全
// List<String> list = new Vector<>(); // 线程安全,但是效率低下
List<String> list = Collections.synchronizedList(new ArrayList<>()); // 线程安全

for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}
}, String.valueOf(i)).start();
}
}
CopyOnWriteArrayList

解决方法3: CopyOnWriteArrayList。CopyOnWriteArrayList 是 java.util.concurrent 包下的。底层原理是写时复制。对应的还有解决 set 线程不方法的方法 CopyOnWriteArraySet。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void noSafeList() {
// List<String> list = new ArrayList<>(); // 线程不安全
// List<String> list = new Vector<>(); // 线程安全,但是效率低下
// List<String> list = Collections.synchronizedList(new ArrayList<>()); // 线程安全
CopyOnWriteArrayList list = new CopyOnWriteArrayList(); // 线程安全,推荐使用

for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
list.add(UUID.randomUUID().toString());
System.out.println(list);
}
}, String.valueOf(i)).start();
}
}

CopyOnWriteArrayList 是 arraylist 的一种线程安全变体,其中所有可变操作(add、set等)都是通过生成底层数组的新副本来实现的。

CopyOnWrite 容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器 Object[] 添加,而是先将当前容器 Object[] 进行Copy,复制出一个新的容器 Object[] newElements,然后向新的容器Object[] newElements 里添加元素。添加元素后,再将原容器的引用指向新的容器。这样做的好处是可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。如下图:

其他资料,CopyOnWrite 优缺点

Set 线程安全

HashSet 是线程不安全的,我们可以使用 CopyOnWriteArraySet。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static void noSafeSet() {
// Set set = new HashSet(); // 线程不安全
CopyOnWriteArraySet set = new CopyOnWriteArraySet(); // 线程安全,推荐使用

for (int i = 0; i < 30; i++) {
new Thread(new Runnable() {
@Override
public void run() {
set.add(UUID.randomUUID().toString());
System.out.println(set);
}
}, String.valueOf(i)).start();
}
}

HashSet 的底层是 HashMap 实现的,在 HashSet 中 add 新增的值,作为了 HashMap 的 key,value 是一个 PRESENT 常量。

1
2
3
4
5
6
7
// HashSet add 方法实现原理
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

// HashSet 底层使用的是 HashMap,key 为 HashSet 元素的值,value 为 PRESENT
private static final Object PRESENT = new Object();

Map 线程安全

HashMap 也是线程不安全的,我们可以使用 ConcurrentHashMap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static void noSafeMap() {
// HashMap<String, Object> map = new HashMap<>(); // 线程不安全
ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<>();

for (int i = 0; i < 30; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), Thread.currentThread().getName());
System.out.println(map);
}
}, String.valueOf(i)).start();
}
}

Callable

代码地址

获得多线程的方式主要有四种,之前的继承 Thread 类和实现 Runable 接口。还有现在的实现 Callable 接口和 java 线程池。

Callable 接口和 Runable 的三点不同:

  1. Callable 实现的方法有返回值
  2. Callable 实现的方法有异常
  3. Callable 实现的方法是 call,Runable 实现的方法是 run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.atguigu.juc;

import java.util.concurrent.Callable;

class UseRunable implements Runnable{
@Override
public void run() {
}
}

class UseCallable implements Callable{
@Override
public Integer call() throws Exception {
return 200;
}
}

public class CallableDemo {
public static void main(String[] args) {

}
}

new Thread()的时候,可以传入一个 Runable 对象,Callable 是否也可以直接传入呢?答案是不可以的,但是可以借助一个中间人 FutureTask,FutureTask 实现了Runable接口,同时也接受 Callable 类型的实现。这样就能连接起来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.atguigu.juc;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

class UseRunable implements Runnable{
@Override
public void run() {}
}

class UseCallable implements Callable{
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName());
return 200;
}
}

public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new UseRunable(), "Runable 方式实现的线程").start();

FutureTask futureTask = new FutureTask<Integer>(new UseCallable());
new Thread(futureTask, "CallableD 方式实现的线程 - A").start();

// "CallableD 方式实现的线程 - B" 不会被执行
new Thread(futureTask, "CallableD 方式实现的线程 - B").start();
while (!futureTask.isDone()){
System.out.println("运算未完成!");
}
System.out.println(futureTask.get()); // 取值放在最后,不然 main 主进程会被阻塞。
}
}

在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给 FutureTask 对象在后台完成,当主线程将来需要时,就可以通过 FutureTask 对象获得后台作业的计算结果或者执行状态。仅在 FutureTask 计算完成时才能检索结果;如果计算尚未完成,就调用 get 方法获取结果,那么主进程就会被阻塞。而且一旦 FutureTask 计算完成,就不能再重新开始或取消计算。所以 FutureTask 有两点需要注意:

  1. 一个 FutureTask 对象只计算一次。
  2. get 方法需要放到最后,即主线程可以在完成自己的任务后,再去获取结果。(否则未计算完获取结果会被阻塞)

JUC 强大辅助类

CountDownLatch

代码地址

CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,这些线程会阻塞。

其它线程调用 countDown 方法会将计数器减1(调用 countDown 方法的线程不会阻塞),当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.atguigu.juc;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行!!");
// 让 CountDownLatch 的计数进行减 1 操作
countDownLatch.countDown();
}
}, "线程" + String.valueOf(i)).start();
}
// 阻塞
countDownLatch.await();
System.out.println("循环创建的线程全部执行完毕,我最后执行");
}
}

CyclicBarrier

CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。线程进入屏障通过 CyclicBarrier 的 await() 方法。

举例:只有集齐七颗龙珠,才能召唤出神龙。源码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.atguigu.juc;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
public static void main(String[] args) {
// CyclicBarrier 第一个参数是需要到达的次数,第二个是达到次数后调用的 Runnable 方法
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, new Runnable() {
@Override
public void run() {
System.out.println("集齐七颗龙珠, 可以召唤神龙!!!");
}
});

//启动多个线程(如果将这里的循环次数改为6,那么是达不到 CyclicBarrier Runnable 方法执行的条件的)
for (int i = 1; i <= 7 ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "被收集");
// 如果未达到 CyclicBarrier 设置的计数,那么会被阻塞
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "龙珠" + String.valueOf(i)).start();
}
}
}
1
2
3
4
5
6
7
8
龙珠2被收集
龙珠4被收集
龙珠3被收集
龙珠1被收集
龙珠6被收集
龙珠5被收集
龙珠7被收集
集齐七颗龙珠, 可以召唤神龙!!!

Semaphore

Semaphore 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

在信号量上我们定义两种操作:

  1. acquire(获取) 当一个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。
  2. release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。

举例:停车场只有三个停车位,但是有六辆车需要停。停车场每辆车停留的时间有限,当停车场有空时,发出信号,外面的车辆可以进入停车场。源码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.atguigu.juc;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
public static void main(String[] args) {
// 三个停车位
Semaphore semaphore = new Semaphore(3);

// 每个线程一辆车
for (int i = 1; i <=6 ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取信号量
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 进入停车场");
// 模拟车在里面停留
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放信号量
System.out.println(Thread.currentThread().getName() + " 开出停车场");
semaphore.release();
}
}
}, "车辆" + String.valueOf(i)).start();
}
}
}

读写锁

Java 并发库中 ReetrantReadWriteLock 实现了 ReadWriteLock 接口并添加了可重入的特性。ReetrantReadWriteLock 读写锁的效率明显高于 synchronized 关键字。ReetrantReadWriteLock 读写锁的实现中,读锁使用共享模式;写锁使用独占模式,换句话说,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。ReetrantReadWriteLock 读写锁的实现中,需要注意的,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁。

以下代码实例就是没有加读写锁的情况,写操作 set 是一起进行的,并没有独占。代码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.atguigu.juc;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

class MyCache{
// volatile 表示这个对象是经常变化的
private volatile Map<String, Object> map = new HashMap<>();

// 设置值
public void set(String key, Object value) {
try{
System.out.println("线程:" + Thread.currentThread().getName() + "正在写"+ key);
// 暂停一会儿线程
TimeUnit.MILLISECONDS.sleep(100);
map.put(key, value);
System.out.println("线程:" + Thread.currentThread().getName() + "写入"+ key+"完成");
}catch(Exception e) {
e.printStackTrace();
}finally{

}
}

// 获取值
public Object get(String key){
Object result = null;
try{
System.out.println("线程:" + Thread.currentThread().getName() + "正在获取"+ key + "的值");
result = map.get(key);
System.out.println("线程:" + Thread.currentThread().getName() + "正在获取"+ key + "的值结束");

}catch(Exception e) {
e.printStackTrace();
}finally{

}
return result;
}

}

public class ReadWriteLockDemo {
// 五个线程进行写操作,五个线程进行读操作

public static void main(String[] args) throws InterruptedException {
MyCache myCache = new MyCache();

for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(new Runnable() {
@Override
public void run() {
myCache.set(String.valueOf(num), UUID.randomUUID().toString().replace("-", ""));
}
}, String.valueOf(num)).start();
}

TimeUnit.MILLISECONDS.sleep(100);

for (int i = 5; i < 10; i++) {
final int num = i;
new Thread(new Runnable() {
@Override
public void run() {
myCache.get(String.valueOf(num));
}
}, String.valueOf(num)).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
线程:1正在写1
线程:4正在写4
线程:3正在写3
线程:2正在写2
线程:0正在写0
线程:5正在获取5的值
线程:5正在获取5的值结束
线程:6正在获取6的值
线程:6正在获取6的值结束
线程:7正在获取7的值
线程:7正在获取7的值结束
线程:8正在获取8的值
线程:8正在获取8的值结束
线程:9正在获取9的值
线程:9正在获取9的值结束
线程:4写入4完成
线程:2写入2完成
线程:1写入1完成
线程:3写入3完成
线程:0写入0完成

以下代码是加上读写锁之后的,可以看见写操作是独占的,读操作的并发的。代码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.atguigu.juc;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class MyCache2{
// volatile 表示这个对象是经常变化的
private volatile Map<String, Object> map = new HashMap<>();

// 创建一把读写锁
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

// 设置值
public void set(String key, Object value) {
try{
// 加上写锁
readWriteLock.writeLock().lock();
System.out.println("线程:" + Thread.currentThread().getName() + "正在写"+ key);
// 暂停一会儿线程
TimeUnit.MILLISECONDS.sleep(100);
map.put(key, value);
System.out.println("线程:" + Thread.currentThread().getName() + "写入"+ key+"完成");
}catch(Exception e) {
e.printStackTrace();
}finally{
// 最后要释放写锁
readWriteLock.writeLock().unlock();
}
}

// 获取值
public Object get(String key){
Object result = null;
try{
// 加上读锁
readWriteLock.readLock().lock();
System.out.println("线程:" + Thread.currentThread().getName() + "正在获取"+ key + "的值");
result = map.get(key);
System.out.println("线程:" + Thread.currentThread().getName() + "正在获取"+ key + "的值结束");
}catch(Exception e) {
e.printStackTrace();
}finally{
// 最后要释放读锁
readWriteLock.readLock().unlock();
}
return result;
}

}

public class ReadWriteLockDemo2 {
// 五个线程进行写操作,五个线程进行读操作

public static void main(String[] args) throws InterruptedException {
MyCache2 myCache2 = new MyCache2();

for (int i = 0; i < 5; i++) {
final int num = i;
new Thread(new Runnable() {
@Override
public void run() {
myCache2.set(String.valueOf(num), UUID.randomUUID().toString().replace("-", ""));
}
}, String.valueOf(num)).start();
}

TimeUnit.MILLISECONDS.sleep(100);

for (int i = 5; i < 10; i++) {
final int num = i;
new Thread(new Runnable() {
@Override
public void run() {
myCache2.get(String.valueOf(num));
}
}, String.valueOf(num)).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
线程:0正在写0
线程:0写入0完成
线程:1正在写1
线程:1写入1完成
线程:4正在写4
线程:4写入4完成
线程:2正在写2
线程:2写入2完成
线程:3正在写3
线程:3写入3完成
线程:5正在获取5的值
线程:5正在获取5的值结束
线程:6正在获取6的值
线程:7正在获取7的值
线程:9正在获取9的值
线程:9正在获取9的值结束
线程:8正在获取8的值
线程:8正在获取8的值结束
线程:6正在获取6的值结束
线程:7正在获取7的值结束

现象变成了写操作是独占的了,也就是一个写操作完成之后在进行下一个写操作。读操作还是并发的

阻塞队列

简介

线程1往阻塞队列里添加元素,线程2从阻塞队列里移除元素,当队列是空的,从队列中获取元素的操作将会被阻塞。当队列是满的,从队列中添加元素的操作将会被阻塞,试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素。试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增。

为什么需要 BlockingQueue?好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

实现类

  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列。(需要指定大小)
  2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
  3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  4. DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
  5. SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
  6. LinkedTransferQueue:由链表组成的无界阻塞队列。
  7. LinkedBlockingDeque:由链表组成的双向阻塞队列。

125标记为粗体的比较常用。

常用方法

代码实例

代码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.atguigu.juc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);

/*
// 抛出异常
System.out.println(blockingQueue.add("a")); // true
System.out.println(blockingQueue.add("b")); // true
System.out.println(blockingQueue.add("c")); // true
// blockingQueue.add("d"); // java.lang.IllegalStateException: Queue full

System.out.println(blockingQueue.remove()); // a
System.out.println(blockingQueue.remove()); // b
System.out.println(blockingQueue.remove()); // c
// System.out.println(blockingQueue.remove()); // java.util.NoSuchElementException
*/

/*
// 特殊值
System.out.println(blockingQueue.offer("a")); // true
System.out.println(blockingQueue.offer("b")); // true
System.out.println(blockingQueue.offer("c")); // true
// System.out.println(blockingQueue.offer("d")); // false

System.out.println(blockingQueue.poll()); // a
System.out.println(blockingQueue.poll()); // b
System.out.println(blockingQueue.poll()); // c
// System.out.println(blockingQueue.poll()); // null
*/

/*
// 阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("b");
// blockingQueue.put("d"); // 会导致阻塞

System.out.println(blockingQueue.take()); // a
System.out.println(blockingQueue.take()); // b
System.out.println(blockingQueue.take()); // c
// System.out.println(blockingQueue.take()); // 会导致阻塞
*/

// 超时
System.out.println(blockingQueue.offer("a", 4, TimeUnit.SECONDS)); // true
System.out.println(blockingQueue.offer("b", 4, TimeUnit.SECONDS)); // true
System.out.println(blockingQueue.offer("c", 4, TimeUnit.SECONDS)); // true
// System.out.println(blockingQueue.offer("d", 4, TimeUnit.SECONDS)); // 超过4s返回 false

System.out.println(blockingQueue.poll(4, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(4, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(4, TimeUnit.SECONDS));
// System.out.println(blockingQueue.poll(4, TimeUnit.SECONDS)); // 超过4s返回 null
}
}

线程池(非常重要)

获得多线程的方式主要有四种,之前的继承 Thread 类和实现 Runable 接口。还有现在的实现 Callable 接口和 java 线程池。

简介

线程池的优势:
线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。它的主要特点为:线程复用;控制最大并发数;管理线程。

Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类

具体实现

左边的 Executors 是工具类。这里类比 List 对象的创建,既可以用 new ArrayList ,也可以使用 Arrays.asList。同理创建线程池可以使用 Executors 工具类,也可以使用 new 的方式。

  1. Executors.newFixedThreadPool(int) : 创建一个线程池,一池有N个固定的线程,有固定线程数的线程。执行长期任务性能好。

    1
    2
    3
    4
    5
    6
    7
    8
    9
      public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    /*
    newFixedThreadPool 创建的线程池 corePoolSize 和 maximumPoolSize 值是相等的,它使用的是LinkedBlockingQueue 阻塞队列。该队列最大可以存储 Integer.MAX_VALUE 个元素。
    */
  2. Executors.newSingleThreadExecutor():一个任务一个任务的执行,一池一线程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(11,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    /*
    newSingleThreadExecutor 创建的线程池 corePoolSize 和 maximumPoolSize 值都是1,它使用的是LinkedBlockingQueue 阻塞队列。该队列最大可以存储 Integer.MAX_VALUE 个元素。
    */
  3. Executors.newCachedThreadPool():执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强。即自动扩充。

    1
    2
    3
    4
    5
    6
    7
    8
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    /*
    newCachedThreadPool 创建的线程池将 corePoolSize 设置为0,将 maximumPoolSize 设置为Integer.MAX_VALUE,它使用的是 SynchronousQueue 队列,该队列最大可以一个值,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
    */

使用示例:代码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.atguigu.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
public static void main(String[] args) {
// 创建一个线程池,并指定大小为 3
// ExecutorService threadPool = Executors.newFixedThreadPool(3);

// 创建一个线程池,里面就只有一个线程
// ExecutorService threadPool = Executors.newSingleThreadExecutor();

// 创建一个线程池,里面的线程自动扩充
ExecutorService threadPool = Executors.newCachedThreadPool();

try{
// 将 10 个或者更多的任务交给线程池中的线程执行
for (int i = 0; i < 30; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "执行了任务!");
}
});
}
}catch(Exception e) {
e.printStackTrace();
}finally{
threadPool.shutdown();
}
}
}

ThreadPoolExecutor

FixedThreadPool, SingleThreadExecutor,CachedThreadPool 底层都是调用 ThreadPoolExecutor 的,只是参数不同而已。ThreadPoolExecutor 的参数有七个,列举如下:

  1. corePoolsize: 线程池中常驻的核心线程数。核心线程是懒加载的,也就是说例如 corePoolsize 是 5,new 的时候并没有真正的创建出 5 个线程出来,而是在调用 execute 方法的时候才创建。
  2. maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
  3. keepAliveTime:多余的空闲线程的存活时间,当前池中线程数量超过 corePoolSize 时,当空闲时间达到keepAliveTime 时,多余线程会被销毁直到只剩下 corePoolSize 个线程为止。
  4. unit:keepAliveTime的单位
  5. workQueue:任务队列,被提交但尚未被执行的任务
  6. threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可
  7. handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的 runnable 的策略。

线程池底层工作原理

  1. 在创建了线程池后,线程池中的线程数为零。
  2. 当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:
    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    • 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务,注意⚠️这里不是将它放入队列,因为队列已经满了,而是马上创建线程来执行这个任务,创建的线程是直接执行这个任务,不去队列里面取任务。当执行完这个任务,才回去队列里面取任务。
    • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

哪种方式创建线程池最好

答案是一个都不用,只使用自定义的!!!

线程池拒绝策略

  1. AbortPolicy(默认):直接抛出 RejectedExecutionException 异常阻止系统正常运行。即当核心线程数满后,且阻塞队列也满后,如果是该策略,那么会抛出异常。
  2. CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。 即当核心线程数满后,且阻塞队列也满后,如果是该策略,任务从哪儿来,就回到哪儿去。
  3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。即当核心线程数满后,且阻塞队列也满后,如果是该策略,那么新的线程任务会替换掉队列中的最老的任务。
  4. DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。即当核心线程数满后,且阻塞队列也满后,如果是该策略,任务直接就丢失了。

自定义线程池

代码地址

AbortPolicy

上面了解到,实际生产环境中,我们并不会去用 Excutors 来创建线程池,一般都是自己使用 ThreadPoolExecutor 来实现。下面就是示例,并且用到了各个拒绝策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.atguigu.juc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
3L, // 回收非核心线程的时间
TimeUnit.SECONDS, // 回收非核心线程的时间单位
new ArrayBlockingQueue<Runnable>(3), // 阻塞队列
Executors.defaultThreadFactory(), // 默认线程池工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

try{
for (int i = 1; i <= 2 ; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() +"执行");
}
});
}
}catch(Exception e) {
e.printStackTrace();
}finally{
threadPoolExecutor.shutdown();
}
}
}

上面使用的是 AbortPolicy 拒绝策略。当 for 循环中的 i<= 5 的时候,都是核心线程在执行。原因是 5 大于核心线程数,大于后,会将任务放入阻塞队列中,阻塞队列大小是3,正好将 3 个放进去。(类比去银行办理业务,只有两个柜台在办理,三个等候的位置,那么来五个人是核心线程可以处理的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try{
for (int i = 1; i <= 5 ; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "执行");
}
});
}
}catch(Exception e) {
e.printStackTrace();
}finally{
threadPoolExecutor.shutdown();
}
1
2
3
4
5
线程:pool-1-thread-2执行  // 两个核心线程就能完成工作
线程:pool-1-thread-1执行
线程:pool-1-thread-2执行
线程:pool-1-thread-1执行
线程:pool-1-thread-2执行

此时,如果将 for 循环中的 i 的值改为 6 的时候,核心线程就处理不过来了,因为核心线程数为 2,队列里面最多可以阻塞3个,还剩1个任务没有地方处理,但是现在线程数量没有达到最大的线程数,此时可以启动新的线程来处理任务,只要保证线程的最大数量不能超过 5 个,即 核心线程 + 新的非核心线程 <= 5 即可。所以 i 的值为 6-8 之间都需要创建新的线程来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try{
for (int i = 1; i <= 8 ; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "执行");
}
});
}
}catch(Exception e) {
e.printStackTrace();
}finally{
threadPoolExecutor.shutdown();
}
1
2
3
4
5
6
7
8
线程:pool-1-thread-1执行  
线程:pool-1-thread-3执行 // 两个线程已经不能完成任务了,又创建了 3,4,5 三个非核心线程来帮忙
线程:pool-1-thread-1执行
线程:pool-1-thread-2执行
线程:pool-1-thread-1执行
线程:pool-1-thread-3执行
线程:pool-1-thread-4执行
线程:pool-1-thread-5执行

但是一旦超过 8,假设 i 的值为 9,此时核心线程为2,队列阻塞3个,创建新线程运行3个,那就还剩下一个任务没有被处理,那么此时就会走拒绝策略了。而此时的拒绝策略是 AbortPolicy,所以直接抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try{
for (int i = 1; i <= 9 ; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "执行");
}
});
}
}catch(Exception e) {
e.printStackTrace();
}finally{
threadPoolExecutor.shutdown();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
线程:pool-1-thread-1执行
线程:pool-1-thread-4执行
线程:pool-1-thread-3执行
线程:pool-1-thread-2执行
线程:pool-1-thread-3执行
线程:pool-1-thread-5执行
线程:pool-1-thread-4执行
线程:pool-1-thread-1执行
java.util.concurrent.RejectedExecutionException: Task com.atguigu.juc.MyThreadPool$1@66d3c617 rejected from java.util.concurrent.ThreadPoolExecutor@63947c6b[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.atguigu.juc.MyThreadPool.main(MyThreadPool.java:22)

CallerRunsPolicy

当任务数量 i 可以被处理时,处理的方法和上面 AbortPolicy 分析一样的,这里只说明当任务数量不能被处理,运行 CallerRunsPolicy 的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.atguigu.juc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
3L, // 回收非核心线程的时间
TimeUnit.SECONDS, // 回收非核心线程的时间单位
new ArrayBlockingQueue<Runnable>(3), // 阻塞队列
Executors.defaultThreadFactory(), // 默认线程池工厂
// new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
new ThreadPoolExecutor.CallerRunsPolicy()
);

try{
for (int i = 1; i <= 33 ; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "执行");
}
});
}
}catch(Exception e) {
e.printStackTrace();
}finally{
threadPoolExecutor.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
线程:pool-1-thread-1执行
线程:pool-1-thread-4执行
线程:pool-1-thread-3执行
线程:pool-1-thread-5执行
线程:pool-1-thread-2执行
线程:pool-1-thread-3执行
线程:pool-1-thread-4执行
线程:pool-1-thread-1执行
线程:main执行 // 当处理不了的时候,从哪儿来就回哪儿去
......

DiscardOldestPolicy

当任务数量 i 可以被处理时,处理的方法和上面 AbortPolicy 分析一样的,这里只说明当任务数量不能被处理,运行 DiscardOldestPolicy 的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.atguigu.juc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
3L, // 回收非核心线程的时间
TimeUnit.SECONDS, // 回收非核心线程的时间单位
new ArrayBlockingQueue<Runnable>(3), // 阻塞队列
Executors.defaultThreadFactory(), // 默认线程池工厂
// new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
// new ThreadPoolExecutor.CallerRunsPolicy()
new ThreadPoolExecutor.DiscardOldestPolicy()
);

try{
for (int i = 1; i <= 33 ; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "执行");
}
});
}
}catch(Exception e) {
e.printStackTrace();
}finally{
threadPoolExecutor.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
线程:pool-1-thread-1执行
线程:pool-1-thread-3执行
线程:pool-1-thread-1执行
线程:pool-1-thread-2执行
线程:pool-1-thread-3执行
线程:pool-1-thread-2执行
线程:pool-1-thread-5执行
线程:pool-1-thread-4执行
线程:pool-1-thread-1执行
线程:pool-1-thread-4执行
线程:pool-1-thread-4执行
线程:pool-1-thread-4执行
线程:pool-1-thread-5执行
线程:pool-1-thread-2执行
线程:pool-1-thread-3执行
线程:pool-1-thread-1执行
线程:pool-1-thread-4执行

有一部分老的任务没有执行,被新的任务给替代了

DiscardPolicy

当任务数量 i 可以被处理时,处理的方法和上面 AbortPolicy 分析一样的,这里只说明当任务数量不能被处理,运行 DiscardPolicy 的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.atguigu.juc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
3L, // 回收非核心线程的时间
TimeUnit.SECONDS, // 回收非核心线程的时间单位
new ArrayBlockingQueue<Runnable>(3), // 阻塞队列
Executors.defaultThreadFactory(), // 默认线程池工厂
// new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
// new ThreadPoolExecutor.CallerRunsPolicy()
// new ThreadPoolExecutor.DiscardOldestPolicy()
new ThreadPoolExecutor.DiscardPolicy()
);

try{
for (int i = 1; i <= 33 ; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程:" + Thread.currentThread().getName() + "执行");
}
});
}
}catch(Exception e) {
e.printStackTrace();
}finally{
threadPoolExecutor.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
线程:pool-1-thread-1执行
线程:pool-1-thread-3执行
线程:pool-1-thread-2执行
线程:pool-1-thread-5执行
线程:pool-1-thread-2执行
线程:pool-1-thread-3执行
线程:pool-1-thread-4执行
线程:pool-1-thread-1执行
线程:pool-1-thread-3执行
线程:pool-1-thread-2执行
线程:pool-1-thread-5执行

尽自己最大的努力执行,能执行多少就执行多少。

分支合并框架

异步回调